package cn.slimsmart.thrift.rpc.demo.client;


import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import cn.slimsmart.thrift.rpc.demo.service.EchoSerivce;
import cn.slimsmart.thrift.rpc.demo.service.HelloWorldService;
import cn.slimsmart.thrift.rpc.demo.service.EchoSerivce.AsyncClient.echo_call;
import cn.slimsmart.thrift.rpc.demo.service.HelloWorldService.AsyncClient.sayHello_call;

/**
 * blog http://www.micmiu.com
 *
 * @author Michael
 *
 */
public class MuiltClientDemo {

	public static final String SERVER_IP = "localhost";
	public static final int SERVER_PORT = 8090;
	public static final int TIMEOUT = 30000;
	
	private static boolean[] callIsRuning=new boolean[]{false};
	

	/**
	 *
	 * @param userName
	 */
	public void startClient(String userName) {
//		startSimpleClient();
		startNoBlockClient();
		startHaHsClient();
		startThreadPoolClent();
		startTThreadedSelectorClent();
		startClient2("d");
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		MuiltClientDemo client = new MuiltClientDemo();
		client.startClient("");

	}

	
	public static void startSimpleClient(){
		try {
			TTransport transport = new TFramedTransport(new TSocket(SERVER_IP,10000, TIMEOUT));
			
//			TTransport transport = new TSocket(SERVER_IP, 10000, TIMEOUT);
			// 协议要和服务端一致
			TProtocol protocol = new TCompactProtocol(transport);
			// TProtocol protocol = new TCompactProtocol(transport);
			// TProtocol protocol = new TJSONProtocol(transport);
			HelloWorldService.Client client = new HelloWorldService.Client(protocol);
			transport.open();
			String result = client.sayHello("ddddd");
			System.out.println("Thrify client result =: " + result);
			transport.close();  
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void startNoBlockClient(){
		//客户端和服务端要使用同一中 Protocol 和 Transport，否则会抛出异常
		try {
			TTransport transport = new TFramedTransport(new TSocket(SERVER_IP,10004, TIMEOUT));
			// 协议要和服务端一致
			TProtocol protocol = new TCompactProtocol(transport);
			TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"EchoSerivce");
			EchoSerivce.Client client = new EchoSerivce.Client(mp1);
			
			
			TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol,"HelloWorldService");
			HelloWorldService.Client client2 = new HelloWorldService.Client(mp2);
			transport.open();
			String result = client.echo("dddd");
			System.out.println("Thrify client result =: " + result);
			
			String result2 = client2.sayHello("hello");
			System.out.println("Thrify client result2 =: " + result2);
			
			
			transport.close();  
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void startHaHsClient(){
		//客户端和服务端要使用同一中 Protocol 和 Transport，否则会抛出异常
		try {
			TTransport transport = new TFramedTransport(new TSocket(SERVER_IP,10002, TIMEOUT));
			// 协议要和服务端一致
			TProtocol protocol = new TCompactProtocol(transport);
			
			
			TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"EchoSerivce");
			EchoSerivce.Client client = new EchoSerivce.Client(mp1);
			
			
			TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol,"HelloWorldService");
			HelloWorldService.Client client2 = new HelloWorldService.Client(mp2);
			transport.open();
			String result = client.echo("dddd");
			System.out.println("Thrify client result =: " + result);
			
			String result2 = client2.sayHello("hello");
			System.out.println("Thrify client result2 =: " + result2);
			
			
			transport.close();  
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void startThreadPoolClent(){
		//客户端和服务端要使用同一中 Protocol 和 Transport，否则会抛出异常
		try {
			TTransport transport = new TFramedTransport(new TSocket(SERVER_IP,10003, TIMEOUT));
			// 协议要和服务端一致
			TProtocol protocol = new TCompactProtocol(transport);
			TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"EchoSerivce");
			EchoSerivce.Client client = new EchoSerivce.Client(mp1);
			
			
			TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol,"HelloWorldService");
			HelloWorldService.Client client2 = new HelloWorldService.Client(mp2);
			transport.open();
			String result = client.echo("dddd");
			System.out.println("Thrify client result =: " + result);
			
			String result2 = client2.sayHello("hello");
			System.out.println("Thrify client result2 =: " + result2);
			transport.close();  
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void startTThreadedSelectorClent(){
		//客户端和服务端要使用同一中 Protocol 和 Transport，否则会抛出异常
		try {
			TTransport transport = new TFramedTransport(new TSocket(SERVER_IP,10004, TIMEOUT));
			// 协议要和服务端一致
			TProtocol protocol = new TCompactProtocol(transport);
			TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"EchoSerivce");
			EchoSerivce.Client client = new EchoSerivce.Client(mp1);
			
			
			TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol,"HelloWorldService");
			HelloWorldService.Client client2 = new HelloWorldService.Client(mp2);
			transport.open();
			String result = client.echo("dddd");
			System.out.println("Thrify client result =: " + result);
			
			String result2 = client2.sayHello("hello");
			System.out.println("Thrify client result2 =: " + result2);
			transport.close();  
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	
	
	/**
	 *
	 * @param userName
	 */
	public static void startClient2(String userName) {
		try {
			TAsyncClientManager clientManager = new TAsyncClientManager();
			TNonblockingTransport transport = new TNonblockingSocket(SERVER_IP,10004, TIMEOUT);
			//open的方法不能调用
			//transport.open();
			TProtocol protocol = new TCompactProtocol(transport);
//			final TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"EchoSerivce");
			TProtocolFactory tProtocolFactory=new TProtocolFactory() {
				private static final long serialVersionUID = 1L;
				@Override
				public TProtocol getProtocol(TTransport trans) {
					return new TMultiplexedProtocol(new TCompactProtocol(trans),"EchoSerivce");
				}
			};
			
			EchoSerivce.AsyncClient asyncClient2=new EchoSerivce.AsyncClient.Factory(clientManager, tProtocolFactory).getAsyncClient(transport);
			
//			EchoSerivce.AsyncClient asyncClient2=new EchoSerivce.AsyncClient(tProtocolFactory, clientManager, transport);
			System.out.println("Client start .....");
			CountDownLatch latch = new CountDownLatch(1);
			AsynCallback callBack = new AsynCallback(latch);
			System.out.println("call method sayHello start ...");
			call(asyncClient2,latch,callBack);
			call(asyncClient2,latch,callBack);
			transport.close();
			call(asyncClient2,latch,callBack);
//			asyncClient2.echo(userName, callBack);
//			asyncClient2.echo(userName, callBack);
//			asyncClient2.echo(userName, callBack);
			

			System.out.println("call method sayHello .... end");
			boolean wait = latch.await(30, TimeUnit.SECONDS);
			System.out.println("latch.await =:" + wait);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("startClient end.");
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private static void call(final EchoSerivce.AsyncClient asyncClient,CountDownLatch latch,final AsynCallback callBack){
		Executors.newSingleThreadExecutor().execute(new Runnable() {
			@Override
			public void run() {
				synchronized (SERVER_IP) {
					try {
						while(callIsRuning[0]){
							Thread.sleep(100);
						}
						callIsRuning[0]=true;
						asyncClient.echo("ddd", callBack);
						System.err.println("1111111");
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		});

	}
	
	public static class AsynCallback implements AsyncMethodCallback<echo_call> {
		private CountDownLatch latch;

		public AsynCallback(CountDownLatch latch) {
			this.latch = latch;
		}

		@Override
		public void onComplete(echo_call response) {
			System.out.println("onComplete");
			try {
				// Thread.sleep(1000L * 1);
				System.out.println("AsynCall result =:"
						+ response.getResult().toString());
			} catch (TException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				latch.countDown();
				callIsRuning[0]=false;
			}
		}

		@Override
		public void onError(Exception exception) {
			System.out.println("onError :" + exception.getMessage());
			latch.countDown();
		}

	}
	
}
